
package com.thread;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import com.config.CommandParse;
import com.databus.SerialNumGenerator;
import com.google.protobuf.GeneratedMessage;
import com.googlecode.protobuf.format.JsonFormat;
import com.databus.*;
import com.packet.PackageHeader;
import com.packet.PackageHeader.MsgType;
import com.protobuf.MsgExpress;
import com.tool.ObjectConvertor;
import com.tool.Utilities;
import com.tool.ZlibUtils;
import javafx.util.Pair;


public class MsgSendThread implements Runnable {
	
	public MsgSendThread(DataOutputStream out, boolean isZip, int threshold, int queueSize)
	{
		mOutPut    = out;
		mIsZip     = isZip;
		mThreshold = threshold;
		mSendQueueSize = queueSize;
		mSendDataContainer = new LinkedBlockingQueue<byte[]>();
		
		mSendReqNum = 0;
		mSendRspNum = 0;
		mSendPubNum = 0;
	}

	public void SetAddr(int addr)
	{
		mAddr = addr;
	}

	public int GetAddr()
	{
		return mAddr;
	}

	public boolean AddEmptyMsg()
	{
		return mSendDataContainer.add(EXIT_FALG);
	}

	public boolean SendMessage(Object msg, int serNum , Options op)
	{
		Integer cmd = CommandParse.GetCommand(msg.getClass().getName());	
		byte type = (null != op && op.type != 0) ? op.type : MsgType.Request;
		return SendMessageProxy(msg, type, cmd, serNum, op);
	}

	public boolean ReplyMessage(PackageHeader header, Object msg)
	{
		Options op = new Options();
		op.sequence = header.IsSequence();
		op.dstaddr = header.srcaddr;
		op.protocol=(byte)header.GetProtocol();

		int cmd=CommandParse.GetCommand(msg.getClass().getName());

		return SendMessageProxy(msg, MsgType.Response, cmd, header.serialnum , op);
	}


	private boolean SendMessageProxy(Object msg, byte type, int cmd, int serNum, Options op)
	{
		Pair<byte[],MsgExpress.SerializeType> serialData=serialize(msg,op);
		if(serialData.getKey()==null)
			return false;
		byte[] data=serialData.getKey();
		MsgExpress.SerializeType serialType=serialData.getValue();
		if(op!=null)
		    op.protocol=(byte)serialType.getNumber();
		cmd=CommandParse.GetCommand(msg.getClass().getName());
		int size = data.length;
		if (size <= PackageHeader.MAX_SINGLE_PACKAGE_SIZE) 
		{
			return SendMessage(data,0, size,type, cmd, serNum, op,null);
		}
		Log.warn("package is too larger, need multi package");
		boolean result = true;
		int pageSize = size % PackageHeader.MAX_SINGLE_PACKAGE_SIZE == 0 ? 
				size / PackageHeader.MAX_SINGLE_PACKAGE_SIZE
				: ((size / PackageHeader.MAX_SINGLE_PACKAGE_SIZE) + 1);
		if(pageSize > PackageHeader.MAX_TOTAL_PACKAGE)
		{
			Log.error("the message is too large, cann't send");
			return false;
		}
		ByteBuffer msgs = ByteBuffer.wrap(data);
		byte[] buffer = new byte[PackageHeader.MAX_SINGLE_PACKAGE_SIZE];
		for (int i = 1; i <= pageSize; i++) 
		{
			int pageNo = (i == pageSize ? 0 : i);
			int length = msgs.remaining() > PackageHeader.MAX_SINGLE_PACKAGE_SIZE ? PackageHeader.MAX_SINGLE_PACKAGE_SIZE : msgs.remaining();
			msgs.get(buffer, 0, length);
			result = result && SendMessage(buffer, 0, length, type, cmd, serNum, op, pageNo);
		}
		return result;
	}

	public int GetSendQueueSize()
	{
		return mSendDataContainer.size();
	}

	public int GetSendReqNum()
	{
		return mSendReqNum;
	}

	public int GetSendRspNum()
	{
		return mSendRspNum;
	}

	public int GetSendPubNum()
	{
		return mSendPubNum;
	}
	
	public boolean SendMessage(int cmd, byte[] buff, Options op)
	{
		if(null == op)
		{
			Log.error("send cmd message ,op is null");
			return false;
		}
		int queueSize = mSendDataContainer.size();
		if(queueSize >= mSendQueueSize)
		{
			Log.error("send message queue is full");
			return false;
		}
		if(queueSize > 0 && queueSize % 1000 == 0)
		{
			Log.warn("send message queue size = " + queueSize);
		}
		AddCount(op.type, cmd);
		
		PackageHeader header = new PackageHeader();
		header.type = op.type;
		header.serialnum = op.serial;
		header.srcaddr = mAddr;
		header.command = cmd;
		header.SetOption(false,true, op.protocol, op.multicast, op.sequence, op.loadbalance,false);
		header.dstaddr = op.dstaddr;

		ByteArrayOutputStream byteStream = new ByteArrayOutputStream(1024);

		try {
			byteStream.write(buff);
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
			return false;
		}
		
		byte[] bodyContent = byteStream.toByteArray();
		byteStream.reset();

		if(mIsZip && bodyContent.length > mThreshold)
		{
			byte[] compressBody = ZlibUtils.compress(bodyContent);
			short compartio = (short) (bodyContent.length / compressBody.length + 1);
			if (compartio > maxCompartio) 
			{
				compartio = 0;
			}
			header.SetCodeInfo(true, false, compartio);
			bodyContent = compressBody;
		}
		header.bodysize = bodyContent.length;
		
		try {
			byteStream.write(header.serialize());
			byteStream.write(bodyContent);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		return mSendDataContainer.add(byteStream.toByteArray());
	}

	private Pair<byte[],MsgExpress.SerializeType> serialize(Object msg , Options op)
	{
		byte[] buffer = null;

		MsgExpress.SerializeType type=MsgExpress.SerializeType.PROTOBUF;

		if(op==null)
		{
			if(GeneratedMessage.class.isAssignableFrom(msg.getClass())) {
				buffer = ((GeneratedMessage) msg).toByteArray();
				type = MsgExpress.SerializeType.PROTOBUF;
			}
			else
			{
				buffer=ObjectConvertor.BeanToJson(msg).getBytes();
				type=MsgExpress.SerializeType.JSON;
			}
		}
		else if(op.protocol==MsgExpress.SerializeType.PROTOBUF_VALUE && GeneratedMessage.class.isAssignableFrom(msg.getClass())) {
			buffer = ((GeneratedMessage)msg).toByteArray();
			type=MsgExpress.SerializeType.PROTOBUF;
		}
		else if(op.protocol==MsgExpress.SerializeType.JSON_VALUE)
		{
			if(GeneratedMessage.class.isAssignableFrom(msg.getClass())) {
				String str = JsonFormat.printToString((GeneratedMessage)msg);
				try {
					buffer = str.getBytes("utf-8");
					type=MsgExpress.SerializeType.JSON;
				} catch (Exception e) {
					e.printStackTrace();
					Log.error(e);
				}
			}
			else {
				buffer = ObjectConvertor.BeanToJson(msg).getBytes();
				type = MsgExpress.SerializeType.JSON;
			}
		}
		else if(op.protocol==MsgExpress.SerializeType.JAVA_VALUE )
		{
			Log.error("To be implemented");
		}
		else
		{
			Log.error("To be implemented");
		}
		return new Pair<byte[],MsgExpress.SerializeType>(buffer,type);
	}

//	private boolean SendMessage(Object msg, byte type, int cmd, int serNum,Options op)
//	{
//		cmd=CommandParse.GetCommand(msg.getClass().getName());
//		byte[] buffer=serialize(msg,op);
//		return SendMessage(buffer, 0, buffer.length, type, cmd, serNum, op, null);
//	}

	private boolean SendMessage(byte[] msg, int offset, int length, byte type, int cmd, int serNum,  Options op, Integer pageNo)
	{
		int queueSize = mSendDataContainer.size();
		if(queueSize >= mSendQueueSize)
		{
			Log.error("send message queue is full");
			return false;
		}
		if(queueSize > 0 && queueSize % 1000 == 0)
		{
			Log.warn("send message queue size = " + queueSize);
		}
		if(null == pageNo || 0 == pageNo)
		{
			AddCount(type, cmd);
		}
		
		PackageHeader header = new PackageHeader();
		header.type = type;
		header.serialnum = serNum;
		header.srcaddr = mAddr;
		header.command = cmd;
		boolean isMustSync = false;
		if (pageNo != null) {
			header.setMulti(pageNo);
			isMustSync = true;
		}
		if (null == op) {
			header.SetOption(false, false, isMustSync);
			MsgExpress.FunctionInfo fun=CommandParse.GetFunctionInfo(cmd);
			if(fun!=null && fun.getParamsCount()==1)
			    header.SetProtocol((short)fun.getParams(0).getType().getNumber());
		} else {
			header.SetOption(false, true,op.protocol, op.multicast,
					isMustSync? true : op.sequence, op.loadbalance, false);
			header.dstaddr = op.dstaddr;
		}

		ByteArrayOutputStream byteStream = new ByteArrayOutputStream(1024);

		byteStream.write(msg, offset, length);
		byte[] bodyContent = byteStream.toByteArray();
		byteStream.reset();

		if(mIsZip && bodyContent.length > mThreshold && (op==null || op.protocol!=MsgExpress.SerializeType.JSON_VALUE))//协议是json的先不压缩
		{
			byte[] compressBody = ZlibUtils.compress(bodyContent);
			short compartio = (short) (bodyContent.length / compressBody.length + 1);
			if (compartio > maxCompartio) 
			{
				compartio = 0;
			}
			header.SetCodeInfo(true, false, compartio);
			bodyContent = compressBody;
		}
		header.bodysize = bodyContent.length;
		
		try {
			byteStream.write(header.serialize());
			byteStream.write(bodyContent);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//if(header.IsSequence())
		//    System.out.println("add sequence Msg:"+header.serialnum);
		return mSendDataContainer.add(byteStream.toByteArray());
	}

	protected void AddCount(byte type, int cmd)
	{
		if(0 != CommandParse.GetServiceId(cmd))
		{
			switch(type)
			{
			case MsgType.Request:
			{
				synchronized (mSendReqNum) {
					mSendReqNum++;
				}
			}
			break;
			case MsgType.Response:
			{
				synchronized (mSendRspNum) {
					mSendRspNum++;
				}
			}
			break;
			case MsgType.Publish:
			{
				synchronized (mSendPubNum) {
					mSendPubNum++;
				}
			}
			break;
			default:
			{
				Log.error("Unkonw Msg type , type = " + type);
			}
			}
		}
		else
		{
			if(MsgType.Response == type)
			{
				synchronized (mSendRspNum) {
					mSendRspNum++;
				}
			}
		}
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		Log.info("Start Message Send Thread...");
		while(!Thread.interrupted())
		{
			try {
				byte[] sendData = mSendDataContainer.take();
				if(sendData == EXIT_FALG){
					break;
				}
				mOutPut.write(sendData);
				mOutPut.flush();
				//Log.info("send data:"+sendData.length);
			} catch (IOException e) {
				// TODO Auto-generated catch block
//				e.printStackTrace();
				break;
			} catch (InterruptedException e) {
//				e.printStackTrace();
				break;
			}
		}
		Log.info("End Message Send Thread...");
	}

	private DataOutputStream mOutPut;                          // socket鍐欏叆缂撳啿鍖�
	private BlockingQueue<byte[]> mSendDataContainer;          // 鍙戦�佹秷鎭槦鍒�
	private boolean mIsZip;                                    // 鏄惁寮�鍚帇缂�
	private int mThreshold;                                    // 鍘嬬缉闃堝��
	private int mSendQueueSize;                                // 鍙戦�侀槦鍒楁渶澶у��
	private int mAddr;                                         // 鑷韩鍦板潃
	private static byte[] EXIT_FALG = {0,0,0,0,0};             // 绾跨▼閫�鍑虹殑绌烘秷鎭�
	private static short maxCompartio = (short)10000;
	
	private Integer mSendReqNum;                               // 鍙戦�佽姹傛暟
	private Integer mSendRspNum;                               // 鍙戦�佸簲绛旀暟
	private Integer mSendPubNum;                               // 鍙戦�侀�氱煡鏁�
}
